CREATE DYNAMIC TABLE

本文为您介绍如何创建Dynamic Table。

使用限制

Dynamic Table的使用限制请参见Dynamic Table支持范围和限制

语法

建表语法

Dynamic Table的建表语法如下:

CREATE DYNAMIC TABLE [IF NOT EXISTS] <schema.tablename>(
[col_name],
[col_name]
  ) [PARTITION BY LIST (col_name)]
WITH (
    [refresh_mode='[full|incremental]',]
    [auto_refresh_enable='[true|false',]

  --增量刷新专用参数:
    [incremental_auto_refresh_schd_start_time='[immediate|<timestamptz>]',]
    [incremental_auto_refresh_interval='[<num> {minute|minutes|hour|hours]',]  
    [incremental_guc_hg_computing_resource='[ local | serverless]',]
    [incremental_guc_hg_experimental_serverless_computing_required_cores='<num>',]

   --全量刷新专用参数:
    [full_auto_refresh_schd_start_time='[immediate|<timestamptz>]',]
    [full_auto_refresh_interval='[<num> {minute|minutes|hour|hours]',] 
    [full_guc_hg_computing_resource='[ local | serverless]',]--hg_full_refresh_computing_resource默认serverless,可以db级别设置,用户可以不设置
    [full_guc_hg_experimental_serverless_computing_required_cores='<num>',]
    
   --共用参数,允许设置guc:
   [refresh_guc_<guc>='xxx]',] 
   
  -- Dynamic Table的通用属性:
    [orientation = '[column | row | row,column]',]
    [table_group = '[tableGroupName]',]
    [distribution_key = 'columnName[,...]]',]
    [clustering_key = '[columnName{:asc]} [,...]]',]
    [event_time_column = '[columnName [,...]]',]
    [bitmap_columns = '[columnName [,...]]',]
    [dictionary_encoding_columns = '[columnName [,...]]',]
    [time_to_live_in_seconds = '<non_negative_literal>',]
    [storage_mode = '[hot | cold]']
    ) 
AS
<query> --query的定义

参数说明

刷新模式与资源

参数分类

参数名

描述

是否必填

默认值

共用刷新参数

refresh_mode

指定数据的刷新模式,支持full、incremental两种刷新模式。

若不设置该参数,则表示不进行刷新。

auto_refresh_enable

是否开启自动刷新。取值为:

  • true:开启自动刷新。

  • false:不开启自动刷新。

false

refresh_guc_<guc>

支持对Refresh设置GUC参数,支持的GUC请参见GUC参数

增量刷新(incremental)

incremental_auto_refresh_schd_start_time

增量刷新的开始时间。取值为:

  • immediate:默认值,建表成功后立即启动增量刷新。

  • <timestamptz>:自定义的增量刷新开始时间,例如设置为2024-08-24 1:00,则表示2024-08-24 1:00开始执行刷新任务。

immediate

incremental_auto_refresh_interval

增量刷新的时间间隔,单位有minute、minutes、hourhours。

  • 取值区间为[1min,48hours]。

  • 若不设置该参数,则表示只会在Refresh开始时间执行一次刷新操作。

incremental_guc_hg_computing_resource

指定执行增量刷新的计算资源,取值为:

  • local:本实例资源。

  • serverless:使用Serverless资源,但需注意查看实例是否满足Serverless需求,详情请参见Serverless Computing使用指南

说明

支持使用ALTER DATABASE xxx SET incremental_guc_hg_computing_resource=xx命令在DB级别设置执行增量刷新的计算资源。

serverless

incremental_guc_hg_experimental_serverless_computing_required_cores

如果使用Serverless资源执行刷新,则需要设置刷新的计算资源量。

说明

不同规格的实例可使用的Serverless资源有一定的限制,详情请参见Serverless Computing使用指南

全量刷新(full)

full_auto_refresh_schd_start_time

全量刷新的开始时间。取值为:

  • immediate:默认值,建表成功后立即启动全量刷新。

  • <timestamptz>:自定义的全量刷新开始时间,例如设置为2024-08-24 1:00,则表示2024-08-24 1:00开始执行刷新任务。

immediate

full_auto_refresh_interval

全量刷新的时间间隔,单位有minute、minutes、hourhours。

  • 取值区间为[1min,48hours]。

  • 若不设置该参数,则表示只会在Refresh开始时间执行一次刷新操作。

full_guc_hg_computing_resource

指定执行全量刷新的计算资源,取值为:

  • local:本实例资源。

  • serverless:使用Serverless资源,但需注意查看实例是否满足Serverless需求,详情请参见Serverless Computing使用指南

说明

支持使用ALTER DATABASE xxx SET full_guc_hg_computing_resource=xx命令在DB级别设置执行全量刷新的计算资源。

serverless

full_guc_hg_experimental_serverless_computing_required_cores

如果使用Serverless执行刷新,则需要设置刷新的计算资源量。

说明

不同规格的实例可使用的Serverless资源有一定的限制,详情请参见Serverless Computing使用指南

表属性参数

参数

描述

是否必填

默认值

full

incremental

col_name

Dynamic Table的字段名。

可以显式指定Dynamic Table的列名,但是不能指定列的属性和数据类型,引擎会自动推导。

说明

若指定了列的属性和数据类型,会导致引擎推导不正确。

Query列名

Query列名

orientation

指定Dynamic Table的存储模式,取值如下:

  • column:列存。

  • row:行存。

  • row,column:行列共存。

column

column

table_group

指定Dynamic Table所在的Table Group,默认为当前数据库下的Default Table Group,详情请参见Table GroupShard Count操作指南

默认Table Group名称

默认Table Group名称

distribution_key

指定Dynamic TableDistribution Key,详情请参见分布键Distribution Key

clustering_key

指定Dynamic TableClustering Key,详情请参见聚簇索引Clustering Key

允许设置,有默认推导值。

允许设置,有默认推导值。

event_time_column

指定Dynamic Tableevent_time_column,详情请参见Event Time Column(Segment Key)

bitmap_columns

指定Dynamic Tablebitmap_columns,详情请参见位图索引Bitmap

TEXT类型字段

TEXT类型字段

dictionary_encoding_columns

指定Dynamic Tabledictionary_encoding_columns,详情请参见字典编码Dictionary Encoding

TEXT类型字段

TEXT类型字段

time_to_live_in_seconds

指定Dynamic Table数据的生命周期。

永久

永久

storage_mode

Dynamic Table的存储模式,取值如下:

  • hot:热存,

  • cold:冷存。

说明

存储模式详情请参见数据分层存储

hot

hot

Query

Dynamic Table中数据生成的Query,设置的刷新模式不同,支持的Query类型和基表类型也不同,详情请参见Dynamic Table支持范围和限制

全量刷新

全量刷新会将Query中的数据以全量的方式写入Dynamic Table。相比于增量刷新,全量刷新的优势在于:

  • 支持更多的基表类型。

  • 支持更丰富的Query类型、算子支持等。

全量刷新相比增量刷新,处理的数据量更多,消耗的资源可能更多,因此更推荐的应用场景包括:定期报表查看、定期回刷数据等。

说明

更多关于全量刷新的信息请参见全量刷新

增量刷新

增量刷新会感知基表数据的变化,然后将Query中的数据以增量的方式写入到Dynamic Table。相比于全量刷新,增量刷新处理的数据量更少,处理时效性会更高。在实际应用中,如果有分钟级数据查询的需求,更推荐使用增量刷新模式,但在使用增量Dynamic Table时需要注意如下几点:

  • 增量刷新Dynamic Table需要为基表开启Binlog,如果是维表JOIN,维表无需开启Binlog。

  • 基表开启Binlog后,会有一定存储开销,您可参考查看表存储明细查询Binlog占用存储。

  • 开启增量刷新后,系统会在后台生成一张状态表,用于记录中间聚合结果,关于状态表技术原理请参见增量刷新(Incremental)。状态表会存储中间聚合数据,因此会占用一定的存储,查看存储请参见查看Dynamic Table表结构和血缘

当前Hologres增量刷新支持新增数据刷新以及全增量一体刷新,来满足业务的不同诉求,详情使用如下:

增量刷新

对于已经存在的基表,开启Binlog并为其创建增量Dynamic Table后,基表中已经存在的数据不会被同步到Dynamic Table中,只有新增的数据会被同步到Dynamic Table,因此可能会存在数据一致性问题。

全增量一体刷新

当前增量Dynamic Table也支持全增量一体消费,即首先消费基表的全量数据,再消费基表新增的数据,该功能需要通过GUC来控制:

-- 打开全增量数据一体消费的GUC
SET hg_experimental_enable_hybrid_incremental_mode = true;

示例如下:

--全增量一体化增量Dynamic table
-- 未开Binlog的基表
CREATE TABLE base_sales(
  day TEXT NOT NULL,
  hour INT,
  user_id BIGINT,
  ts TIMESTAMPTZ,
  amount FLOAT,
  pk text NOT NULL PRIMARY key
);

-- 导入数据
INSERT INTO base_sales VALUES ('2024-08-29',1,222222,'2024-08-29 16:41:19.141528+08',5,'ddd');


-- 为基表打开Binlog
ALTER TABLE base_sales SET (binlog_level = replica);

-- 再为基表导入增量数据
INSERT INTO base_sales VALUES ('2024-08-29',2,3333,'2024-08-29 17:44:19.141528+08',100,'aaaaa');


-- 创建增量Dynamic Table
CREATE DYNAMIC TABLE sales_incremental
  WITH (refresh_mode='incremental') 
AS 
  SELECT day, hour, SUM(amount), COUNT(1) 
    FROM base_sales 
  GROUP BY day, hour;

-- 打开全增量数据一体消费的GUC,然后刷新dynamic table
SET hg_experimental_enable_hybrid_incremental_mode = true;
REFRESH TABLE sales_incremental;

对比数据一致性:

  • 查询基表

    SELECT day, hour, SUM(amount), COUNT(1) 
        FROM base_sales 
      GROUP BY day, hour;

    返回结果:

    day	    hour	sum	count
    2024-08-29	1	5	1
    2024-08-29	2	100	1
  • 查询Dynamic Table

    SELECT * FROM sales_incremental;

    返回结果:

    day	    hour	sum	count
    2024-08-29	1	5	1
    2024-08-29	2	100	1

使用示例

示例1:创建全量刷新Dynamic Table并自动开始执行刷新

执行下述操作前,请先参考一键导入公共数据集tpch_10g公共数据集的数据导入至Hologres。

--创建 “test” Schema
CREATE SCHEMA test;

--创建单表全量刷新的dynamic table,并立即开始刷新,每1小时刷新一次。
CREATE DYNAMIC TABLE thch_q1_full
  WITH (
    refresh_mode='full',
    auto_refresh_enable='true',
    full_auto_refresh_interval='1 hours',
    full_guc_hg_computing_resource='serverless',
    full_guc_hg_experimental_serverless_computing_required_cores='32'
       ) 
AS
  SELECT
        l_returnflag,
        l_linestatus,
        SUM(l_quantity) AS sum_qty,
        SUM(l_extendedprice) AS sum_base_price,
        SUM(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
        SUM(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
        AVG(l_quantity) AS avg_qty,
        AVG(l_extendedprice) AS avg_price,
        AVG(l_discount) AS avg_disc,
        COUNT(*) AS count_order
FROM
        hologres_dataset_tpch_10g.lineitem
WHERE
        l_shipdate <= DATE '1998-12-01' - INTERVAL '120' DAY
GROUP BY
        l_returnflag,
        l_linestatus;

示例2:创建增量刷新的Dynamic Table并指定开始刷新时间

执行下述操作前,请先参考一键导入公共数据集tpch_10g公共数据集的数据导入至Hologres。

创建增量Dynamic Table示例如下:

说明

在创建增量Dynamic Table之前,需要为基表开启Binlog(维表不需要开启)。

--为基表开binlog:
BEGIN;
CALL set_table_property('hologres_dataset_tpch_10g.lineitem', 'binlog.level', 'replica');
COMMIT;

--创建单表赠量刷新的dynamic table,并指定开始刷新时间,每3min刷新一次
CREATE DYNAMIC TABLE public.tpch_q1_incremental 
WITH (
refresh_mode='incremental',
auto_refresh_enable='true',
incremental_auto_refresh_schd_start_time='2024-09-15 23:50:0',
incremental_auto_refresh_interval='3 minutes',
incremental_guc_hg_computing_resource='serverless',
incremental_guc_hg_experimental_serverless_computing_required_cores='30'
) AS SELECT
        l_returnflag,
        l_linestatus,
        COUNT(*) AS count_order
FROM
        hologres_dataset_tpch_10g.lineitem
WHERE
        l_shipdate <= DATE '1998-12-01' - INTERVAL '120' DAY
GROUP BY
        l_returnflag,
        l_linestatus
;

示例3:创建多表JOIN的全量刷新Dynamic Table

--创建query为多表join的dynamic table,全量刷新模式,每3小时刷新一次。
CREATE DYNAMIC TABLE test.dt_q_full
  WITH (
    refresh_mode='full',
    auto_refresh_enable='true',
    full_auto_refresh_schd_start_time='immediate',
    full_auto_refresh_interval='3 hours',
    full_guc_hg_computing_resource='serverless',
    full_guc_hg_experimental_serverless_computing_required_cores='64'
  ) 
AS 
SELECT
        o_orderpriority,
        COUNT(*) AS order_count
FROM
        hologres_dataset_tpch_10g.orders
WHERE
        o_orderdate >= DATE '1996-07-01'
        AND o_orderdate < DATE '1996-07-01' + INTERVAL '3' MONTH
        AND EXISTS (
                SELECT
                        *
                FROM
                        hologres_dataset_tpch_10g.lineitem
                WHERE
                        l_orderkey = o_orderkey
                        AND l_commitdate < l_receiptdate
        )
GROUP BY
        o_orderpriority;

示例4:创建维表JOIN的增量刷新Dynamic Table

创建维表JOIN的增量Dynamic Table示例如下:

说明

在创建增量Dynamic Table之前,需要为基表开启Binlog(维表不需要开启)。

维表JOIN的语义是:对每条数据,只会关联当时维表的最新版本数据,即JOIN行为只发生在处理时间(Processing Time)。如果JOIN行为发生后,维表中的数据发生了变化(新增、更新或删除),则已关联的维表数据不会被同步变化。

假设hm.sale_detailhm.user_info两张表已存在,SQL示例如下:

--为基表开binlog,维表不需要开启
BEGIN;
CALL set_table_property('hm.sale_detail', 'binlog.level', 'replica');
COMMIT;


CREATE DYNAMIC TABLE dt_sales_incremental
  WITH (
    refresh_mode='incremental',
    auto_refresh_enable='true',
    incremental_auto_refresh_schd_start_time='2024-09-15 00:00:00',
    incremental_auto_refresh_interval='5 minutes',
    incremental_guc_hg_computing_resource='serverless',
    incremental_guc_hg_experimental_serverless_computing_required_cores='128') 
AS 
SELECT 
    sale_detail.app_id,
    sale_detail.uid,
    product,
    SUM(sale_detail.gmv) AS sum_gmv,
    sale_detail.order_time,
    user_info.province,
    user_info.city 
FROM hm.sale_detail 
INNER JOIN hm.user_info  FOR SYSTEM_TIME AS OF PROCTIME()
ON sale_detail.uid =user_info.uid
GROUP BY sale_detail.app_id,sale_detail.uid,sale_detail.product,sale_detail.order_time,user_info.province,user_info.city;

示例5:创建分区Dynamic Table

  1. 准备基表和数据。

    -- 创建分区源表
    CREATE TABLE base_sales(
      uid INT,
      opreate_time TIMESTAMPTZ,
      amount FLOAT,
      pk text NOT NULL,
      ds text,
      PRIMARY key(ds)
    ) PARTITION BY LIST (ds) ;
    
    CREATE TABLE base_sales_20240616 PARTITION OF base_sales FOR VALUES IN ('20240616');
    
    INSERT INTO base_sales_20240616 VALUES (1,'2024-06-16 16:08:25.387466+08','2','1','20240616');
  2. 创建分区Dynamic Table父表,父表只设置Query的定义,不设置刷新模式。

    --创建extension
    CREATE EXTENSION roaringbitmap;
    
    CREATE DYNAMIC TABLE test.partition_dt_base_sales
    PARTITION BY LIST (ds)
    AS
    SELECT  
     public.RB_BUILD_AGG(uid),
     opreate_time,
     amount,
     pk,
     ds,
    COUNT(1)
    FROM    base_sales
    GROUP BY opreate_time ,amount,pk,ds;
  3. 手动创建分区子表,并为分区子表设置刷新模式。

    -- 创建Dynamic Table分区子表,并指定刷新模式为全量刷新,建表成功后立即开始启动刷新,刷新间隔为30mins,并使用本实例资源刷新。
    CREATE DYNAMIC TABLE test.partition_dt_base_sales_20240616 PARTITION OF test.partition_dt_base_sales FOR VALUES IN ('20240616')
      WITH (
        refresh_mode='full',
        auto_refresh_enable='true',
        full_auto_refresh_schd_start_time='immediate',
        full_auto_refresh_interval='30 minutes'
           );

下一步:管理Dynamic Table

Dynamic Table创建成功后,您可以执行如下操作: